其他
干货 | 携程基于Quasar协程的NIO实践
作者简介
Ryan,携程Java开发工程师,对高并发、网络编程等领域有浓厚兴趣。
一、Java异步编程与非阻塞IO
1.1 Java中的异步工具
HttpResponse a = getA();
HttpResponse b ;
if(a.getBody().equals("1")){
b=getB1();
}
else{
b=getB2();
}
String ans=b.getBody();
private CompletableFuture<HttpResponse> getA();
private CompletableFuture<HttpResponse> getB1();
private CompletableFuture<HttpResponse> getB2();
String ans = getA()
.thenCompose(a -> {
if (a.getBody().equals("1")) {
return getB1();
} else {
return getB2();
}
}).get()
.getBody();
使用CompletableFuture的链式回调后,代码变得不友好。RxJava等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于if/else、switch/case,乃至while/for、break/continue这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO使用。当时使用NIO时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。
1.2 协程
1.3 Quasar任务调度原理
public String instrumentDemo(){
initial();
String ans = getFromNIO();
return ans;
}
二、系统异步IO改造
new Fiber(()->{
//方法体
}).start();
2.1 整合Netty与Quasar
AsyncCompletionStage.get(future)
//创建HttpClient
AsyncHttpClient httpClient = Dsl.asyncHttpClient();
//创建请求
Request request = createRequest();
//将网络请求交给HttpClient执行
CompletableFuture<Response> future = httpClient.executeRequest(request)
.toCompletableFuture();
//通过Quasar挂起协程
Response response = AsyncCompletionStage.get(future);
//获取网络结果后,通过future传递response并唤醒协程重新执行
deal(response);
过程可由下图表示。
Quasar框架AsyncCompletionStage.get内部完成的工作相当于,在HttpClient返回的future上注册回调,回调的内容是“IO操作完成后通知调度器唤醒协程”,这样将NIO异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。
2.2 声明挂起方法
<plugin>
<groupId>com.vlkan</groupId>
<artifactId>quasar-maven-plugin</artifactId>
<version>0.7.9</version>
<executions>
<execution>
<goals>
<goal>instrument</goal>
</goals>
</execution>
</executions>
</plugin>
public void startFiber() throws ExecutionException, InterruptedException {
Fiber<Void> fiber = new Fiber<Void>(() -> {
//不用继续抛出异常
Response response = waitNextLayer1();
deal(response);
}).start();
}
private Response waitNextLayer1() throws SuspendExecution {
return waitNextLayer2();
}
private Response waitNextLayer2() throws SuspendExecution {
CompletableFuture<Response> future = httpClient.executeRequest(request)
.toCompletableFuture();
try {
// Quasar框架工具类抛出SuspendExecution
return AsyncCompletionStage.get(future);
} catch (Exception e) {
return null;
}
}
2.3 异步RPC调用
interface Callback<TResponse> {
void callback(TResponse TResponse, Exception e);
}
CompletableFuture<Response> future=new CompletableFuture<>();
//调用hello接口的异步API
new RpcClient().helloAsync(request, new Callback<Response>() {
public void callback(Response response, Exception e) {
if (e == null) future.complete(response);
else future.completeExceptionally(e);
}
});
//在此处调用Quasar的API,挂起直至RPC调用完成
Response response = AsyncCompletionStage.get(future);
@FunctionalInterface
private interface RpcAsyncCall<TRequest, TResponse> {
void request(TRequest request, Callback<TResponse> callback);
}
public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {
CompletableFuture<TResponse> future = new CompletableFuture<>();
call.request(request, (response, e) -> {
if (e == null) future.complete(response);
else future.completeExceptionally(e);
});
try {
//使用Quasar等待Future结果
return AsyncCompletionStage.get(future);
} catch (Exception e) {
return null;
}
}
最后的调用可简化一行代码,该方法适用于所有该Rpc框架提供的异步接口。
Response response= waitRpc(new RpcClient()::helloAsync, request);
2.4 阻塞操作的处理
public void waitBlocking() throws SuspendExecution {
//从DB获取结果
String ans = waitBlocking(this::selectFromDB);
}
private ExecutorService threadPool = Executors.newCachedThreadPool();
private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution {
CompletableFuture<T> future = new CompletableFuture<>();
threadPool.submit(() -> {
T ans = supplier.get();
future.complete(ans);
});
try {
return AsyncCompletionStage.get(future);
} catch (Exception e) {
return null;
}
}
2.5 并发工具的使用
三、总结
3.1 限制与风险
3.2 总结与展望
数据库压力降低90%,携程机票订单缓存系统实践 五大实例详解,携程 Redis 跨机房双向同步实践 QMQ在携程的落地实践 查询耗时降低2/3,携程度假搜索引擎架构优化 《携程架构实践》《携程人工智能实践》上市啦!
“携程技术”公众号后台回复“新书”,
可免费获得两本书的试读样章~
《携程架构实践》
京东
当当
《携程人工智能实践》
京东
当当
“携程技术”公众号
分享,交流,成长